/*
 * CRtpSink.cpp
 *
 *  Created on: 2016年4月29日
 *      Author: terry
 */

#include "CRtpSink.h"
#include "RtpPort.h"
#include "CLog.h"
#include "CRtpPackager.h"
#include "H264RtpPackager.h"
#include "H265RtpPackager.h"
#include "TStringUtil.h"
#include "TFileUtil.h"
#include <errno.h>


using namespace jrtplib;

static const int RTP_TIMEOUT = 10; // seconds


namespace av
{

CRtpSink::CRtpSink():
		m_port(),
		m_clockRate(90000),
		m_cb(),
		m_context(),
		m_codec(),
		m_payload(),
		m_lastSeq(-1),
        m_packager(new CRtpPackager())
{
}

CRtpSink::~CRtpSink()
{
	close();
}

int CRtpSink::open(int port)
{
	if (port <= 0)
	{
		for (size_t i = 0; i < RtpPort::count(); ++ i)
		{
			port = RtpPort::make();
			if (openSession(port))
			{
				break;
			}
		}
	}
    else
    {
        if ((port % 2) != 0)
        {
            port = (port / 2) * 2;
        }

        if (!openSession(port))
        {
    	    return EEXIST;
        }
    }

	m_port = port;
	m_name = comn::StringUtil::format("%d", m_port);

	start();

	return 0;
}

void CRtpSink::close()
{
	if (isRunning())
	{
		stop();
	}
}

bool CRtpSink::isOpen()
{
	return isRunning();
}

void CRtpSink::setCallback(RtpSinkCallback cb, void* context)
{
	comn::AutoCritSec lock(m_cs);

	m_cb = cb;
	m_context = context;
}

void CRtpSink::setFormat(int codec, int clockRate)
{
	comn::AutoCritSec lock(m_cs);

	m_codec = codec;
	m_clockRate = clockRate;

	if (m_codec == MEDIA_CODEC_H264)
	{
		m_packager.reset(new H264RtpPackager());
	}
	//else if (m_codec == MEDIA_CODEC_HEVC)
	//{
	//	m_packager.reset(new H265RtpPackager());
	//}
	else if (m_codec == MEDIA_CODEC_G711A)
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_AUDIO));
	}
	else if (m_codec == MEDIA_CODEC_G711U)
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_AUDIO));
	}
	else
	{
		m_packager.reset(new CRtpPackager(MEDIA_TYPE_NONE));
	}
}

void CRtpSink::setCacheSize(int size)
{
	if (size < 0)
	{
		size = 0;
	}

	m_orderCache.setFullSize(size);
}

int CRtpSink::getPort()
{
	return m_port;
}

int CRtpSink::run()
{
    while (!m_canExit)
    {
        Poll();

        bool dataAvailable = true;

        RTPTime timeout(RTP_TIMEOUT, 0);
        WaitForIncomingData(timeout, &dataAvailable);

        if (dataAvailable == false)
        {
            continue;
        }

        BeginDataAccess();

        bool working = GotoFirstSourceWithData();
        while (working && !m_canExit)
        {
            //process packet
            RTPPacket* pPacket = NULL;
            pPacket = GetNextPacket();
            while (pPacket)
            {
                //tracePacket(pPacket);
                onRtpPacket(pPacket);

                if (m_canExit)
                {
                    break;
                }

                pPacket = GetNextPacket();
            }

            working = GotoNextSourceWithData();
        }

        EndDataAccess();
    }

    printFlowStat();

    return 0;
}

void CRtpSink::doStop()
{
	AbortWait();
}

bool CRtpSink::startup()
{
	return true;
}

void CRtpSink::cleanup()
{
	// pass
}



void CRtpSink::onRtpPacket(jrtplib::RTPPacket* pPacket)
{
	if (!filterPacket(pPacket))
	{
		DeletePacket(pPacket);
		return;
	}

	if (m_orderCache.fullSize() != 0)
	{
		comn::AutoCritSec lock(m_cs);
		m_orderCache.push(pPacket);
		if (!m_orderCache.isReady())
		{
			return;
		}

		pPacket = m_orderCache.pop();

	}

	if (!pPacket)
	{
		return;
	}

	uint16_t seq = pPacket->GetSequenceNumber();

	/// 序号重复, 认为是相同的包
	if (seq == m_lastSeq)
	{
		CLog::warning("RtpMediaSource(%s) found packet with same seq(%d)\n", getName().c_str(), seq);
		DeletePacket(pPacket);
		return;
	}

	m_recvState.update(pPacket->GetTimestamp());

	if (m_recvState.m_packetCount == 1)
	{
		m_lastSeq = seq;

		CLog::debug("RtpMediaSource(%s) first rtp packet. clock:%s, seq:%d, ts:%d\n",
				getName().c_str(), CLog::getShortTime(), seq, pPacket->GetTimestamp());
	}
	else
	{
		if (seq == (m_lastSeq+1))	// 正常
		{
		}
		else if (seq > (m_lastSeq+1))	// 跳跃
		{
			int lostCount = seq - (m_lastSeq+1);
			m_flowmeter.lostPacket(lostCount);
			CLog::warning("RtpMediaSource(%s) bump. last:%d, now:%d, lost:%d\n",
					getName().c_str(), m_lastSeq, seq, lostCount);
		}
		else	// 晚到的包
		{
			CLog::warning("RtpMediaSource(%s) late. last:%d, now:%d\n", getName().c_str(), m_lastSeq, seq);
		}

		m_lastSeq = seq;
	}

	dealRtpPacket(pPacket);
}

bool CRtpSink::filterPacket(jrtplib::RTPPacket* pPacket)
{
	comn::AutoCritSec lock(m_cs);

	uint8_t pt = pPacket->GetPayloadType();

	if ((m_payload >= 0) && (m_payload == pt))
	{
		return true;
	}

	return true;
}

void CRtpSink::dealRtpPacket(jrtplib::RTPPacket* pPacket)
{
	RtpPacket pkt;
	assignFrom(pkt, pPacket);

	MediaPacket pktOut;
	if (joinPacket(pkt, pktOut))
	{
		fireMediaPacket(pktOut);

        //CLog::debug("pkt. size:%5d, pts:%I64d, flags:%d\n", pktOut.size, pktOut.pts, pktOut.flags);
        //comn::FileUtil::write(pktOut.data, pktOut.size, "h264_src.h264", true);
	}

	DeletePacket(pPacket);
}



void CRtpSink::assignFrom(RtpPacket& pkt, jrtplib::RTPPacket* pPacket)
{
    uint32_t ts = pPacket->GetTimestamp();
    uint8_t* pPayload = pPacket->GetPayloadData();
    size_t  length = pPacket->GetPayloadLength();

    memset(&pkt, 0, sizeof(pkt));
    pkt.data = pPayload;
    pkt.size = length;
    pkt.ts = ts;
    pkt.mark = pPacket->HasMarker();
    pkt.pt = pPacket->GetPayloadType();
}

MediaPacketPtr CRtpSink::acquire(size_t length)
{
	MediaPacketPtr pkt(new MediaPacket());
	pkt->ensure(length);
	return pkt;
}

bool CRtpSink::joinPacket(RtpPacket& pkt, MediaPacket& pktOut)
{
	bool done = false;
    comn::AutoCritSec lock(m_cs);
    if (m_packager && m_packager->join(pkt, pktOut))
    {
        done = true;
    }
    return done;
}

std::string CRtpSink::getName()
{
    return m_name;
}

void CRtpSink::fireMediaPacket(MediaPacket& packet)
{
    if (m_cb)
    {
        (*m_cb)(this, packet.data, packet.size, packet.pts, packet.flags, m_context);
    }
}


void CRtpSink::printFlowStat()
{
    double bytes = (double)(m_flowmeter.getTotalBytes() / 1000) / 1000;
    int64_t pktCount = m_flowmeter.getCounter();
    int lostCount = (int)m_flowmeter.getLostCount();
    double lostRate = 0;
    if (pktCount > 0)
    {
        lostRate = ((double)lostCount) / (pktCount + lostCount) * 100;
    }

    CLog::info("RtpMediaSource(%s) bandwidth:%.02fKbps, bytes:%.02fM, packet:%.02fK, dropped:%d, rate:%.02f%%\n",
        getName().c_str(),
        m_flowmeter.getAvgRate(),
        bytes,
        (double)(pktCount /1000.0),
        lostCount,
        lostRate
        );
}

bool CRtpSink::openSession(int port)
{
	RTPUDPv4TransmissionParams transParams;
	RTPSessionParams sessparams;
	sessparams.SetOwnTimestampUnit(1.0/m_clockRate);
	sessparams.SetAcceptOwnPackets(true);
	sessparams.SetUsePollThread(false);

	transParams.SetRTPReceiveBuffer(1024 * 1024 * 4);

	transParams.SetPortbase(port);

	int ret = Create(sessparams, &transParams);
	return (ret == 0);
}





} /* namespace av */
